fix(sqs): make visibility timeout and batch size configurable#6
fix(sqs): make visibility timeout and batch size configurable#6
Conversation
The SQS client had a hardcoded VisibilityTimeout of 36000 seconds (10 hours), which caused the ApproximateAgeOfOldestMessage metric to climb to ~10h before messages expired from in-flight status. This directly triggered recurring PagerDuty alerts on the elasticsearch_indexer_k8s queue. Changes: - Add functional options pattern (WithVisibilityTimeout, WithMaxMessages, WithWaitTimeSeconds) so callers can configure per-queue settings - Change defaults to sensible values: 120s visibility, 10 max messages, 20s wait time - Add ReceiveBatch() method that returns all messages (up to max) - Keep Receive() backwards-compatible (returns first message only) - Existing callers that pass no options get the new defaults automatically Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
WalkthroughThe client adds a functional options pattern ( 🚥 Pre-merge checks | ✅ 2✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. 📝 Coding Plan
Comment Tip CodeRabbit can suggest fixes for GitHub Check annotations.Configure the |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@sqs/sqs.go`:
- Around line 81-107: The Receive method currently calls ReceiveBatch which
requests up to c.maxMessages from SQS then returns only the first message,
causing other messages to be hidden and lost; fix by making Receive perform a
true single-message poll (set MaxNumberOfMessages=1) rather than relying on
ReceiveBatch — either implement Receive to call the SQS ReceiveMessage API with
MaxNumberOfMessages=1 (or add a private helper that takes maxMessages and use it
for both Receive and ReceiveBatch) so Receive returns at most one message while
leaving ReceiveBatch behavior unchanged (reference Receive, ReceiveBatch,
c.maxMessages, and the SQS ReceiveMessage call).
- Around line 23-50: WithVisibilityTimeout and WithWaitTimeSeconds currently
accept any int64 and should validate/clamp to AWS SQS limits like
WithMaxMessages does; update the option closures for WithVisibilityTimeout to
clamp seconds to the 0–43200 range before assigning to Client.visibilityTimeout,
and update WithWaitTimeSeconds to clamp seconds to the 0–20 range before
assigning to Client.waitTimeSeconds so invalid configs are caught early (follow
the same pattern used in WithMaxMessages).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
- Receive() was delegating to ReceiveBatch() which fetches up to maxMessages, then returning only the first. The remaining messages became invisible for the visibility timeout but were never deleted. Now uses a private receiveMessages(1) helper so only 1 message is fetched from SQS. - WithVisibilityTimeout now clamps to 0–43200 (AWS SQS limit). - WithWaitTimeSeconds now clamps to 0–20 (AWS SQS limit). - Follows the same validation pattern already used in WithMaxMessages. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
🧹 Nitpick comments (1)
sqs/sqs.go (1)
25-34: Move parameter clamping outside closures to eliminate mutation of captured values.The three Option builders (
WithVisibilityTimeout,WithMaxMessages,WithWaitTimeSeconds) mutate captured parameters inside their closures. While the current usage pattern is sequential, this design creates a potential concurrency hazard if an Option instance is ever reused or shared across goroutines. Clamp the parameters before returning the closure instead.Also applies to: lines 40–48, 54–62
♻️ Proposed refactor
func WithVisibilityTimeout(seconds int64) Option { + if seconds < 0 { + seconds = 0 + } + if seconds > 43200 { + seconds = 43200 + } return func(c *Client) { - if seconds < 0 { - seconds = 0 - } - if seconds > 43200 { - seconds = 43200 - } c.visibilityTimeout = seconds } } func WithMaxMessages(n int64) Option { + if n < 1 { + n = 1 + } + if n > 10 { + n = 10 + } return func(c *Client) { - if n < 1 { - n = 1 - } - if n > 10 { - n = 10 - } c.maxMessages = n } } func WithWaitTimeSeconds(seconds int64) Option { + if seconds < 0 { + seconds = 0 + } + if seconds > 20 { + seconds = 20 + } return func(c *Client) { - if seconds < 0 { - seconds = 0 - } - if seconds > 20 { - seconds = 20 - } c.waitTimeSeconds = seconds } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@sqs/sqs.go` around lines 25 - 34, The Option builders (WithVisibilityTimeout, WithMaxMessages, WithWaitTimeSeconds) currently clamp their input parameters inside the returned closure which mutates captured variables and can cause concurrency hazards if the Option is reused; move the clamping logic out of the closure by normalizing the input (apply min/max limits) before returning the func(c *Client) closure so the closure only assigns the already-clamped value to fields (visibilityTimeout, maxMessages, waitTimeSeconds) without mutating captured variables.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@sqs/sqs.go`:
- Around line 25-34: The Option builders (WithVisibilityTimeout,
WithMaxMessages, WithWaitTimeSeconds) currently clamp their input parameters
inside the returned closure which mutates captured variables and can cause
concurrency hazards if the Option is reused; move the clamping logic out of the
closure by normalizing the input (apply min/max limits) before returning the
func(c *Client) closure so the closure only assigns the already-clamped value to
fields (visibilityTimeout, maxMessages, waitTimeSeconds) without mutating
captured variables.
Summary
VisibilityTimeoutof 36,000 seconds (10 hours), which directly caused recurring PagerDuty alerts on theelasticsearch_indexer_k8squeue (ApproximateAgeOfOldestMessageclimbing to ~10h)MaxNumberOfMessages: 1, limiting throughput to 1 message per receive callReceiveBatch()method returns all fetched messages; existingReceive()stays backwards-compatibleChanges
WithVisibilityTimeout()WithMaxMessages()WithWaitTimeSeconds()Backwards Compatibility
NewClient()accepts variadic...Option— existing callers passing 3 args still compileReceive()still returns a single message (delegates toReceiveBatchinternally)Test plan
reply-mongo-to-esworker still compiles and works with the new defaultspublish-es-indexerwith the updated library and confirm the SQS age-of-oldest-message alert resolves🤖 Generated with Claude Code
Summary by CodeRabbit